Ładowanie bibliotek¶

In [ ]:
import warnings
import pandas as pd
import matplotlib.pyplot as plt
import numpy as np
import folium

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
from pyspark.sql.functions import trim
from pyspark.sql.types import FloatType
from pyspark.sql.functions import udf, row_number
from pyspark.sql.window import Window

warnings.filterwarnings("ignore")
spark = SparkSession.builder.getOrCreate()

Zadanie 1¶

In [ ]:
country_path = "countries of the world.csv"
df_countries = spark.read.csv(country_path, inferSchema = True,header = True)
df_countries.printSchema()
root
 |-- Country: string (nullable = true)
 |-- Region: string (nullable = true)
 |-- Population: integer (nullable = true)
 |-- Area (square miles): integer (nullable = true)
 |-- Pop. Density (per sq. mi.): string (nullable = true)
 |-- Coastline (coast/area ratio): string (nullable = true)
 |-- Net migration: string (nullable = true)
 |-- Infant mortality (per 1000 births): string (nullable = true)
 |-- GDP ($ per capita): integer (nullable = true)
 |-- Literacy (%): string (nullable = true)
 |-- Phones (per 1000): string (nullable = true)
 |-- Arable (%): string (nullable = true)
 |-- Crops (%): string (nullable = true)
 |-- Other (%): string (nullable = true)
 |-- Climate: string (nullable = true)
 |-- Birthrate: string (nullable = true)
 |-- Deathrate: string (nullable = true)
 |-- Agriculture: string (nullable = true)
 |-- Industry: string (nullable = true)
 |-- Service: string (nullable = true)

In [ ]:
df_countries.toPandas()
Out[ ]:
Country Region Population Area (square miles) Pop. Density (per sq. mi.) Coastline (coast/area ratio) Net migration Infant mortality (per 1000 births) GDP ($ per capita) Literacy (%) Phones (per 1000) Arable (%) Crops (%) Other (%) Climate Birthrate Deathrate Agriculture Industry Service
0 Afghanistan ASIA (EX. NEAR EAST) 31056997 647500 48,0 0,00 23,06 163,07 700.0 36,0 3,2 12,13 0,22 87,65 1 46,6 20,34 0,38 0,24 0,38
1 Albania EASTERN EUROPE 3581655 28748 124,6 1,26 -4,93 21,52 4500.0 86,5 71,2 21,09 4,42 74,49 3 15,11 5,22 0,232 0,188 0,579
2 Algeria NORTHERN AFRICA 32930091 2381740 13,8 0,04 -0,39 31 6000.0 70,0 78,1 3,22 0,25 96,53 1 17,14 4,61 0,101 0,6 0,298
3 American Samoa OCEANIA 57794 199 290,4 58,29 -20,71 9,27 8000.0 97,0 259,5 10 15 75 2 22,46 3,27 None None None
4 Andorra WESTERN EUROPE 71201 468 152,1 0,00 6,6 4,05 19000.0 100,0 497,2 2,22 0 97,78 3 8,71 6,25 None None None
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
222 West Bank NEAR EAST 2460492 5860 419,9 0,00 2,98 19,62 800.0 None 145,2 16,9 18,97 64,13 3 31,67 3,92 0,09 0,28 0,63
223 Western Sahara NORTHERN AFRICA 273008 266000 1,0 0,42 None None NaN None None 0,02 0 99,98 1 None None None None 0,4
224 Yemen NEAR EAST 21456188 527970 40,6 0,36 0 61,5 800.0 50,2 37,2 2,78 0,24 96,98 1 42,89 8,3 0,135 0,472 0,393
225 Zambia SUB-SAHARAN AFRICA 11502010 752614 15,3 0,00 0 88,29 800.0 80,6 8,2 7,08 0,03 92,9 2 41 19,93 0,22 0,29 0,489
226 Zimbabwe SUB-SAHARAN AFRICA 12236805 390580 31,3 0,00 0 67,69 1900.0 90,7 26,8 8,32 0,34 91,34 2 28,01 21,84 0,179 0,243 0,579

227 rows × 20 columns

In [ ]:
# data processing
countries_poor = df_countries.select('GDP ($ per capita)','Birthrate','Deathrate').filter(col('GDP ($ per capita)').isNotNull()).orderBy('GDP ($ per capita)').limit(30).toPandas()
countries_poor['Birthrate'] = pd.to_numeric(countries_poor['Birthrate'].str.replace(',','.'))
countries_poor['Deathrate'] = pd.to_numeric(countries_poor['Deathrate'].str.replace(',','.'))
countries_poor.head(5)
Out[ ]:
GDP ($ per capita) Birthrate Deathrate
0 500 26.99 6.24
1 500 45.76 23.03
2 500 45.13 16.63
3 600 42.22 13.46
4 600 39.45 3.80
In [ ]:
fig, ax = plt.subplots()

ax.scatter(countries_poor['GDP ($ per capita)'],countries_poor['Birthrate'],color='blue',label='birthrate')
ax.scatter(countries_poor['GDP ($ per capita)'],countries_poor['Deathrate'],color='red',label='deathrate')
ax.set_xlabel('GDP')
ax.set_ylabel('Rate')
ax.legend()
plt.grid(True)
plt.show()
No description has been provided for this image
In [ ]:
countries_poor_binned = countries_poor.groupby('GDP ($ per capita)').agg({'Birthrate' : [np.mean, np.std], 'Deathrate': [np.mean,np.std]})
countries_poor_binned = countries_poor_binned.fillna(0)
In [ ]:
fig, ax = plt.subplots()

ax.scatter(
    countries_poor['GDP ($ per capita)'],
    countries_poor['Birthrate'],
    color='blue',
    alpha=0.1)

ax.scatter(
    countries_poor_binned.index.values,
    countries_poor_binned[('Birthrate','mean')],
    color='blue',
    label='birthrate',
    s=100)

ax.errorbar(
    countries_poor_binned.index.values,
    countries_poor_binned[('Birthrate','mean')],
    yerr=countries_poor_binned[('Birthrate','std')],
    fmt='none',
    capsize=3
)
ax.scatter(
    countries_poor['GDP ($ per capita)'],
    countries_poor['Deathrate'],
    color='red',
    alpha=0.1)

ax.scatter(
    countries_poor_binned.index.values,
    countries_poor_binned[('Deathrate','mean')],
    color='red',
    label='deathrate',
    s=100)

ax.errorbar(
    countries_poor_binned.index.values,
    countries_poor_binned[('Deathrate','mean')],
    yerr=countries_poor_binned[('Deathrate','std')],
    fmt='none',
    capsize=3,
    ecolor='r'
)

ax.set_xlabel('GDP')
ax.set_ylabel('Rate')
ax.legend()
plt.grid(True)
plt.show()
No description has been provided for this image
In [ ]:
countries_rich = df_countries.select('GDP ($ per capita)','Birthrate','Deathrate').filter(col('GDP ($ per capita)').isNotNull()).orderBy(col('GDP ($ per capita)').desc()).limit(30).toPandas()
countries_rich['Birthrate'] = pd.to_numeric(countries_rich['Birthrate'].str.replace(',','.'))
countries_rich['Deathrate'] = pd.to_numeric(countries_rich['Deathrate'].str.replace(',','.'))
In [ ]:
fig, ax = plt.subplots()

ax.scatter(countries_rich['GDP ($ per capita)'],countries_rich['Birthrate'],color='blue',label='birthrate')
ax.scatter(countries_rich['GDP ($ per capita)'],countries_rich['Deathrate'],color='red',label='deathrate')
ax.set_xlabel('GDP')
ax.set_ylabel('Rate')
ax.legend()
plt.grid(True)
plt.show()
No description has been provided for this image
In [ ]:
bins = np.arange(countries_rich['GDP ($ per capita)'].min(),countries_rich['GDP ($ per capita)'].max()+2000,2000)
points, births_mean, births_std, deaths_mean, deaths_std = [], [], [], [], []
for i in range(1,len(bins)):
    index = (countries_rich['GDP ($ per capita)'] >= bins[i-1]) & (countries_rich['GDP ($ per capita)'] < bins[i])
    births = countries_rich.loc[index,'Birthrate']
    deaths = countries_rich.loc[index,'Deathrate']
    if len(births) > 0:
        points.append(bins[i-1]+1000)
        births_mean.append(births.mean())
        births_std.append(births.std())
        deaths_mean.append(deaths.mean())
        deaths_std.append(deaths.std())
In [ ]:
fig, ax = plt.subplots()

ax.scatter(
    countries_rich['GDP ($ per capita)'],
    countries_rich['Birthrate'],
    color='blue',
    alpha=0.1)

ax.scatter(
    points,
    births_mean,
    color='blue',
    label='birthrate',
    s=100)

ax.errorbar(
    points,
    births_mean,
    yerr=births_std,
    fmt='none',
    capsize=3
)
ax.scatter(
    countries_rich['GDP ($ per capita)'],
    countries_rich['Deathrate'],
    color='red',
    alpha=0.1)

ax.scatter(
    points,
    deaths_mean,
    color='red',
    label='deathrate',
    s=100)

ax.errorbar(
    points,
    deaths_mean,
    yerr=deaths_std,
    fmt='none',
    capsize=3,
    ecolor='r'
)

ax.set_xlabel('GDP')
ax.set_ylabel('Rate')
ax.legend()
plt.grid(True)
plt.show()
No description has been provided for this image

Zadanie 2¶

In [ ]:
airports_path = "airports.csv"
In [ ]:
airports = spark.read.csv(airports_path, inferSchema = True)
In [ ]:
airports.toPandas()
Out[ ]:
_c0 _c1 _c2 _c3 _c4 _c5 _c6 _c7 _c8 _c9 _c10 _c11 _c12 _c13
0 1 Goroka Airport Goroka Papua New Guinea GKA AYGA -6.081690 145.391998 5282 10 U Pacific/Port_Moresby airport OurAirports
1 2 Madang Airport Madang Papua New Guinea MAG AYMD -5.207080 145.789001 20 10 U Pacific/Port_Moresby airport OurAirports
2 3 Mount Hagen Kagamuga Airport Mount Hagen Papua New Guinea HGU AYMH -5.826790 144.296005 5388 10 U Pacific/Port_Moresby airport OurAirports
3 4 Nadzab Airport Nadzab Papua New Guinea LAE AYNZ -6.569803 146.725977 239 10 U Pacific/Port_Moresby airport OurAirports
4 5 Port Moresby Jacksons International Airport Port Moresby Papua New Guinea POM AYPY -9.443380 147.220001 146 10 U Pacific/Port_Moresby airport OurAirports
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
7179 12053 Rugao Air Base Rugao China RUG ZSRG 32.257885 120.501656 0 \N \N \N airport OurAirports
7180 12054 Wuhu Air Base Wuhu China WHU ZSWU 31.390600 118.408997 0 \N \N \N airport OurAirports
7181 12055 Shanshan Airport Shanshan China SXJ ZWSS 42.911701 90.247498 0 \N \N \N airport OurAirports
7182 12056 Yingkou Lanqi Airport Yingkou China YKH ZYYK 40.542524 122.358600 0 \N \N \N airport OurAirports
7183 12057 Shenyang Dongta Airport Shenyang China \N ZYYY 41.784401 123.496002 0 \N \N \N airport OurAirports

7184 rows × 14 columns

In [ ]:
airports = airports.toDF('Airport ID','Name','City','Country','IATA','ICAO','Latitude','Longitude','Altitude','Timezone','DST','Tz','Type','Source')
airports = airports.withColumn('Country',trim(col('Country')))
airports.toPandas()
Out[ ]:
Airport ID Name City Country IATA ICAO Latitude Longitude Altitude Timezone DST Tz Type Source
0 1 Goroka Airport Goroka Papua New Guinea GKA AYGA -6.081690 145.391998 5282 10 U Pacific/Port_Moresby airport OurAirports
1 2 Madang Airport Madang Papua New Guinea MAG AYMD -5.207080 145.789001 20 10 U Pacific/Port_Moresby airport OurAirports
2 3 Mount Hagen Kagamuga Airport Mount Hagen Papua New Guinea HGU AYMH -5.826790 144.296005 5388 10 U Pacific/Port_Moresby airport OurAirports
3 4 Nadzab Airport Nadzab Papua New Guinea LAE AYNZ -6.569803 146.725977 239 10 U Pacific/Port_Moresby airport OurAirports
4 5 Port Moresby Jacksons International Airport Port Moresby Papua New Guinea POM AYPY -9.443380 147.220001 146 10 U Pacific/Port_Moresby airport OurAirports
... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
7179 12053 Rugao Air Base Rugao China RUG ZSRG 32.257885 120.501656 0 \N \N \N airport OurAirports
7180 12054 Wuhu Air Base Wuhu China WHU ZSWU 31.390600 118.408997 0 \N \N \N airport OurAirports
7181 12055 Shanshan Airport Shanshan China SXJ ZWSS 42.911701 90.247498 0 \N \N \N airport OurAirports
7182 12056 Yingkou Lanqi Airport Yingkou China YKH ZYYK 40.542524 122.358600 0 \N \N \N airport OurAirports
7183 12057 Shenyang Dongta Airport Shenyang China \N ZYYY 41.784401 123.496002 0 \N \N \N airport OurAirports

7184 rows × 14 columns

In [ ]:
airports_coordinates = airports.select('Name','Longitude','Latitude').na.drop().toPandas()
In [ ]:
m = folium.Map()

for _, row in airports_coordinates.iterrows():
    folium.Marker(
        location=[row['Latitude'], row['Longitude']],
        popup=row['Name'],
    ).add_to(m)
   
m
Out[ ]:
Make this Notebook Trusted to load map: File -> Trust Notebook

Zadanie 3¶

In [ ]:
airports.printSchema()
root
 |-- Airport ID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- IATA: string (nullable = true)
 |-- ICAO: string (nullable = true)
 |-- Latitude: double (nullable = true)
 |-- Longitude: double (nullable = true)
 |-- Altitude: integer (nullable = true)
 |-- Timezone: string (nullable = true)
 |-- DST: string (nullable = true)
 |-- Tz: string (nullable = true)
 |-- Type: string (nullable = true)
 |-- Source: string (nullable = true)

In [ ]:
def feet_to_meters(s) -> float:
    return s * 0.3048
In [ ]:
float_udf = udf(feet_to_meters, FloatType())
In [ ]:
airports_converted = airports.withColumn('Meters above sea level', float_udf('Altitude'))
In [ ]:
windowsspec = Window.partitionBy('Country').orderBy(col('Meters above sea level').asc())
airports_ordered = airports_converted.select('Country','Meters above sea level','Longitude','Latitude','Name') \
.withColumn('rank',row_number().over(windowsspec))
airports_10_highest = airports_ordered.filter(col('rank') == 1).drop('rank').orderBy(col('Meters above sea level').desc()).limit(10)
airporst_highest_df = airports_10_highest.toPandas()
In [ ]:
m = folium.Map()

for _, row in airporst_highest_df.iterrows():
    folium.Marker(
        location=[row['Latitude'], row['Longitude']],
        popup=f"{row['Name']}\n{row['Meters above sea level']} m.n.p.m",
    ).add_to(m)
    
m
Out[ ]:
Make this Notebook Trusted to load map: File -> Trust Notebook

Zadanie 4¶

In [ ]:
file_path = "countries of the world.csv"

countries = spark.read.csv(file_path, inferSchema = True, header = True)
countries = countries.withColumn('Country',trim(col('Country')))
In [ ]:
countries.toPandas()
Out[ ]:
Country Region Population Area (square miles) Pop. Density (per sq. mi.) Coastline (coast/area ratio) Net migration Infant mortality (per 1000 births) GDP ($ per capita) Literacy (%) Phones (per 1000) Arable (%) Crops (%) Other (%) Climate Birthrate Deathrate Agriculture Industry Service
0 Afghanistan ASIA (EX. NEAR EAST) 31056997 647500 48,0 0,00 23,06 163,07 700.0 36,0 3,2 12,13 0,22 87,65 1 46,6 20,34 0,38 0,24 0,38
1 Albania EASTERN EUROPE 3581655 28748 124,6 1,26 -4,93 21,52 4500.0 86,5 71,2 21,09 4,42 74,49 3 15,11 5,22 0,232 0,188 0,579
2 Algeria NORTHERN AFRICA 32930091 2381740 13,8 0,04 -0,39 31 6000.0 70,0 78,1 3,22 0,25 96,53 1 17,14 4,61 0,101 0,6 0,298
3 American Samoa OCEANIA 57794 199 290,4 58,29 -20,71 9,27 8000.0 97,0 259,5 10 15 75 2 22,46 3,27 None None None
4 Andorra WESTERN EUROPE 71201 468 152,1 0,00 6,6 4,05 19000.0 100,0 497,2 2,22 0 97,78 3 8,71 6,25 None None None
... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...
222 West Bank NEAR EAST 2460492 5860 419,9 0,00 2,98 19,62 800.0 None 145,2 16,9 18,97 64,13 3 31,67 3,92 0,09 0,28 0,63
223 Western Sahara NORTHERN AFRICA 273008 266000 1,0 0,42 None None NaN None None 0,02 0 99,98 1 None None None None 0,4
224 Yemen NEAR EAST 21456188 527970 40,6 0,36 0 61,5 800.0 50,2 37,2 2,78 0,24 96,98 1 42,89 8,3 0,135 0,472 0,393
225 Zambia SUB-SAHARAN AFRICA 11502010 752614 15,3 0,00 0 88,29 800.0 80,6 8,2 7,08 0,03 92,9 2 41 19,93 0,22 0,29 0,489
226 Zimbabwe SUB-SAHARAN AFRICA 12236805 390580 31,3 0,00 0 67,69 1900.0 90,7 26,8 8,32 0,34 91,34 2 28,01 21,84 0,179 0,243 0,579

227 rows × 20 columns

In [ ]:
merged_data = airports.select('Country','Airport ID').join(countries,'Country','inner').toPandas()
merged_data.head(5)
Out[ ]:
Country Airport ID Region Population Area (square miles) Pop. Density (per sq. mi.) Coastline (coast/area ratio) Net migration Infant mortality (per 1000 births) GDP ($ per capita) ... Phones (per 1000) Arable (%) Crops (%) Other (%) Climate Birthrate Deathrate Agriculture Industry Service
0 Papua New Guinea 1 OCEANIA 5670544 462840 12,3 1,11 0 51,45 2200.0 ... 10,9 0,46 1,44 98,1 2 29,36 7,25 0,353 0,381 0,266
1 Papua New Guinea 2 OCEANIA 5670544 462840 12,3 1,11 0 51,45 2200.0 ... 10,9 0,46 1,44 98,1 2 29,36 7,25 0,353 0,381 0,266
2 Papua New Guinea 3 OCEANIA 5670544 462840 12,3 1,11 0 51,45 2200.0 ... 10,9 0,46 1,44 98,1 2 29,36 7,25 0,353 0,381 0,266
3 Papua New Guinea 4 OCEANIA 5670544 462840 12,3 1,11 0 51,45 2200.0 ... 10,9 0,46 1,44 98,1 2 29,36 7,25 0,353 0,381 0,266
4 Papua New Guinea 5 OCEANIA 5670544 462840 12,3 1,11 0 51,45 2200.0 ... 10,9 0,46 1,44 98,1 2 29,36 7,25 0,353 0,381 0,266

5 rows × 21 columns

In [ ]:
airport_countries = airports.select('Country').withColumnRenamed('Country','Country_airport')
miss_countries = airport_countries.select('Country_airport') \
.join(countries.select('Country'),airport_countries.Country_airport == countries.Country,'outer') \
.where((col('Country_airport').isNull()) | (col('Country').isNull()))  \
.toPandas()
miss_countries.head(5)
Out[ ]:
Country_airport Country
0 None Andorra
1 Antarctica None
2 Antarctica None
3 Antarctica None
4 Antarctica None
In [ ]:
merged_data = airports.select('Country','Airport ID').join(countries,'Country','inner')
airport_by_country = merged_data.select('Country','Airport ID').groupBy('Country').count()
airports_by_land_area = airport_by_country.join(countries.select('Country','Area (square miles)'), 'Country', 'inner')
In [ ]:
airports_by_land_area.toPandas().plot(x='Area (square miles)',y='count',kind='scatter')
Out[ ]:
<Axes: xlabel='Area (square miles)', ylabel='count'>
No description has been provided for this image
In [ ]:
airports_by_land_area.filter(col('Area (square miles)') < 500000).toPandas().plot(x='Area (square miles)',y='count',kind='scatter')
Out[ ]:
<Axes: xlabel='Area (square miles)', ylabel='count'>
No description has been provided for this image

Zadanie 5¶

In [ ]:
airport_by_region = merged_data.select('Region','Airport ID').groupBy('Region').count()
In [ ]:
airport_by_region.toPandas()
Out[ ]:
Region count
0 BALTICS 24
1 C.W. OF IND. STATES 357
2 ASIA (EX. NEAR EAST) 1064
3 WESTERN EUROPE 1224
4 NORTHERN AMERICA 1875
5 NEAR EAST 208
6 EASTERN EUROPE 143
7 OCEANIA 542
8 SUB-SAHARAN AFRICA 534
9 NORTHERN AFRICA 121
10 LATIN AMER. & CARIB 921
In [ ]:
airport_by_region.toPandas().set_index('Region').plot(y='count',kind='pie',autopct='%1.0f%%',figsize=(10, 10))
Out[ ]:
<Axes: ylabel='count'>
No description has been provided for this image